package com.example.javaapiclient.repository.impl;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.*;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import co.elastic.clients.elasticsearch._types.query_dsl.Operator;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch._types.query_dsl.TextQueryType;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.get.GetResult;
import co.elastic.clients.elasticsearch.core.mget.MultiGetResponseItem;
import co.elastic.clients.elasticsearch.core.search.*;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import com.alibaba.fastjson.JSON;
import com.example.javaapiclient.entity.common.AggregationParamEntity;
import com.example.javaapiclient.entity.common.BaseEsEntity;
import com.example.javaapiclient.entity.common.BulkResponseResult;
import com.example.javaapiclient.exception.BizException;
import com.example.javaapiclient.repository.ElasticsearchTemplate;
import com.example.javaapiclient.strategy.aggregation.ParseAggregationService;
import com.example.javaapiclient.utils.ElasticSearchHelpUtils;
import com.example.javaapiclient.utils.EmptyUtil;
import com.example.javaapiclient.utils.SpringContextUtil;
import com.github.pagehelper.PageInfo;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;


/**
 * Elasticsearch 增删改查基础功能
 **/
@Component
@Slf4j
public class ElasticsearchTemplateImpl<T extends BaseEsEntity, M> implements ElasticsearchTemplate<T, M> {
    //非分页，默认的查询条数
    private static final int DEFALT_PAGE_SIZE = 200;
    //搜索建议默认条数
    private static final int COMPLETION_SUGGESTION_SIZE = 10;
    //SCROLL 查询上下文有效时间 默认给10分钟
    private static final String DEFAULT_SCROLL_TIME = "10m";
    //SCROLL查询 每页默认条数
    private static final int DEFAULT_SCROLL_PERPAGE = 100;
    //默认编码
    private static final String DEFAULT_CHARSET = "UTF-8";
    //使用scroll查询返回结果的map中scrollId的key
    public static final String SCROLL_ID = "scrollId";
    //使用scroll查询返回结果的map中结果集的可以
    public static final String SCROLL_SEARCH_RESULT = "result";

    @Resource
    private ElasticsearchClient client;

    @Resource
    private ElasticSearchHelpUtils<T> elasticSearchHelpUtils;

    /**
     * 插入或更新文档，全量覆盖
     * id不存在插入文档
     * id存在更新文档
     * @param t 数据对象
     * @return true：插入成功 false：插入失败
     * @throws BizException 自定义异常
     */
    @Override
    public boolean saveOrUpdateCover(T t) throws BizException {
        if (EmptyUtil.isEmpty(t)){
            return false;
        }
        String indexName = elasticSearchHelpUtils.getIndexName(t);
        String dataJson = JSON.toJSONString(elasticSearchHelpUtils.objectToMap(t));
        try {
            Reader input = new StringReader(dataJson);
            IndexRequest<JsonData> request = IndexRequest.of(i -> i
                    .index(indexName)
                    .id(t.getId())
                    .withJson(input)
            );

            IndexResponse response = client.index(request);
            if (EmptyUtil.isEmpty(response)){
                return false;
            }
            log.info("文档执行save方法，返回结果：{}",JSON.toJSONString(response));
            return response.version()>0;
        }catch (IOException e){
            log.error("文档id:{} 索引到ES出错 出错原因：",t.getId(),e);
            throw new BizException(0,"索引到ES出错 出错原因："+e.getMessage());
        }
    }


    /**
     * 批量执行插入或修改操作，全量覆盖
     * id不存在插入文档
     * id存在更新文档
     * @param paramList
     * @return
     * @throws BizException
     */
    @Override
    public List<BulkResponseResult> saveOrUpdateCover(List<T> paramList) throws BizException {
        if (EmptyUtil.isEmpty(paramList)) {
            return Collections.emptyList();
        }
        BulkRequest.Builder bulkRequestBuilder = new BulkRequest.Builder();
        List<BulkOperation> bulkOperationList = new ArrayList<>();

        for (T t:paramList) {
            String indexName = elasticSearchHelpUtils.getIndexName(t);
            String dataJson = JSON.toJSONString(elasticSearchHelpUtils.objectToMap(t));
            bulkOperationList.add(
                    BulkOperation.of(
                            bulkOperation->bulkOperation.index(
                                    index->index.index(indexName).id(t.getId()).document(JsonData.fromJson(dataJson))
                            )
                    )
            );
        }
        bulkRequestBuilder.operations(bulkOperationList);
        try {
            BulkResponse bulkResponse = client.bulk(bulkRequestBuilder.build());
            log.info("文档执行save 批量操作方法，返回结果：{}",JSON.toJSONString(bulkResponse));
            return elasticSearchHelpUtils.convertBulkResponseResult(bulkResponse);
        } catch (IOException e) {
            log.error("文档批量索引到ES出错 出错原因：",e);
            throw new BizException(0,"文档批量索引到ES出错 出错原因："+e.getMessage());
        }
    }

    /**
     * 文档ID存在则更新 不存在更插入
     * 只更新有值的字段
     * @param paramList
     * @return
     * @throws BizException
     */
    @Override
    public List<BulkResponseResult> upsertBulkCover(List<T> paramList) throws BizException {
        if (EmptyUtil.isEmpty(paramList)) {
            return Collections.emptyList();
        }

        BulkRequest.Builder bulkRequestBuilder = new BulkRequest.Builder();
        List<BulkOperation> bulkOperationList = new ArrayList<>();

        for (T t:paramList) {
            String indexName = elasticSearchHelpUtils.getIndexName(t);
            String dataJson = JSON.toJSONString(elasticSearchHelpUtils.objectToMap(t));
            bulkOperationList.add(
                    BulkOperation.of(
                            bulkOperation->bulkOperation.update(
                                    update->update.index(indexName).id(t.getId())
                                            .action(updateAction->updateAction.docAsUpsert(true).doc(JsonData.fromJson(dataJson)))
                            )
                    )
            );
        }
        bulkRequestBuilder.operations(bulkOperationList);


        try {
            BulkResponse bulkResponse = client.bulk(bulkRequestBuilder.build());
            log.info("文档执行批量upsert方法，返回结果：{}",JSON.toJSONString(bulkResponse));
            return elasticSearchHelpUtils.convertBulkResponseResult(bulkResponse);
        } catch (IOException e) {
            log.error("文档执行批量upsert方法出错 出错原因：",e);
            throw new BizException(0,"文档执行批量upsert方法出错 出错原因："+e.getMessage());
        }
    }

    /**
     * 根据id删除
     * id不存在时返回false
     * @param id
     * @return
     * @throws BizException
     */
    @Override
    public boolean deleteById(M id, Class<T> clazz) throws BizException {
        if (EmptyUtil.isEmpty(id)){
            return false;
        }
        String indexName = elasticSearchHelpUtils.getIndexName(clazz);
        try {
            DeleteResponse deleteResponse = client.delete(deleteRequest->deleteRequest.index(indexName).id(id.toString()));
            if (EmptyUtil.isEmpty(deleteResponse)){
                return false;
            }
            return deleteResponse.result() == Result.Deleted;
        }catch (IOException e){
            log.error("根据id：{}删除文档出错 出错原因：",id,e);
            throw new BizException(0,"删除文档出错 出错原因："+e.getMessage());
        }
    }

    /**
     * 根据id集合批量删除
     * @param ids
     */
    @Override
    public List<BulkResponseResult> deleteByIds(List<M> ids,Class<T> clazz) throws BizException {
        if (EmptyUtil.isEmpty(ids)) {
            return Collections.emptyList();
        }

        BulkRequest.Builder bulkRequestBuilder = new BulkRequest.Builder();
        List<BulkOperation> bulkOperationList = new ArrayList<>();

        String indexName = elasticSearchHelpUtils.getIndexName(clazz);
        for (M id:ids) {
            bulkOperationList.add(
                    BulkOperation.of(
                            bulkOperation->bulkOperation.delete(
                                    deleteOperation->deleteOperation.index(indexName).id(id.toString())
                            )
                    )
            );
        }
        bulkRequestBuilder.operations(bulkOperationList);
        try {
            BulkResponse bulkResponse = client.bulk(bulkRequestBuilder.build());
            log.info("文档执行批量删除操作，返回结果：{}",JSON.toJSONString(bulkResponse));
            return elasticSearchHelpUtils.convertBulkResponseResult(bulkResponse);
        }catch (IOException e){
            log.error("根据id集合：{}批量删除文档出错 出错原因：",ids,e);
            throw new BizException(0,"批量删除文档出错 出错原因："+e.getMessage());
        }
    }

    /**
     * Delete by Query 根据查询结果删除
     * 返回结果：删除了多少条记录
     * @param query 查询条件实体
     * @param clazz 索引实体
     * @return
     * @throws BizException
     */
    @Override
    public Long deleteByQuery(Query query, Class<T> clazz) throws BizException {
        if (EmptyUtil.isEmpty(query)){
            return 0L;
        }
        String indexName = elasticSearchHelpUtils.getIndexName(clazz);
        try {
            DeleteByQueryResponse deleteByQueryResponse = client.deleteByQuery(deleteByQueryRequest->
                    deleteByQueryRequest.query(query).index(indexName).conflicts(Conflicts.Proceed)
            );
            return deleteByQueryResponse.deleted();
        } catch (IOException e) {
            log.error("根据查询条件删除文档出错出错 出错原因：",e);
            throw new BizException(0,"根据查询条件删除文档出错出错 出错原因："+e.getMessage());
        }
    }

    @Override
    public T getById(M id, Class<T> clazz) throws BizException {
        if (EmptyUtil.isEmpty(id)){
            return null;
        }
        String indexName = elasticSearchHelpUtils.getIndexName(clazz);
        try {
            GetResponse getResponse = client.get(getRequest->getRequest.id(id.toString()).index(indexName), Map.class);
            if (EmptyUtil.isEmpty(getResponse) || !getResponse.found()) {
                return null;
            }
            return JSON.parseObject(JSON.toJSONString(getResponse.source()), clazz);
        } catch (IOException e) {
            log.error("根据id：{}查询文档出错 出错原因：",id,e);
            throw new BizException(0,"根据ID查询文档出错 出错原因："+e.getMessage());
        }
    }

    @Override
    public List<T> mGetById(List<M> ids, Class<T> clazz) throws BizException {
        if (EmptyUtil.isEmpty(ids)){
            return Collections.emptyList();
        }
        String indexName = elasticSearchHelpUtils.getIndexName(clazz);
        List<String> idList = new ArrayList<>();
        for (M id:ids) {
            idList.add((String) id);
        }
        try {
            MgetResponse response = client.mget(
                    mgetRequest->mgetRequest.index(indexName).ids(idList),
                    Map.class);
            if (EmptyUtil.isEmpty(response)){
                return Collections.emptyList();
            }
            List<T> list = new ArrayList<>();
            for (int i = 0; i < response.docs().size(); i++) {
                MultiGetResponseItem item = (MultiGetResponseItem)response.docs().get(i);
                if (item.isFailure()){
                    continue;
                }
                GetResult getResult = item.result();
                if (getResult.found()) {
                    log.info("map:"+getResult.source());
                    list.add(JSON.parseObject(JSON.toJSONString(getResult.source()), clazz));
                }
            }
            return list;
        }catch (IOException e){
            log.error("根据ids：{}查询文档出错 出错原因：",ids,e);
            throw new BizException(0,"批量查询文档出错 出错原因："+e.getMessage());
        }
    }

    @Override
    public boolean exists(M id, Class<T> clazz) throws BizException {
        if (EmptyUtil.isEmpty(id)){
            return false;
        }
        String indexName = elasticSearchHelpUtils.getIndexName(clazz);
        try {
            BooleanResponse booleanResponse = client.exists(existsRequest -> existsRequest.index(indexName).id(JsonData.of(id).toString()).storedFields("_none_"));
            return booleanResponse.value();
        }catch (IOException e){
            log.error("根据id：{}查询文档是否存在 出错原因：",id,e);
            throw new BizException(0,"根据ID查询文档是否存在出错 出错原因："+e.getMessage());
        }
    }

    /**
     * 根据id修改，值为空会被修改成空值、默认值
     * @param t
     * @return
     * @throws BizException
     */
    @Override
    public boolean updateById(T t) throws BizException {
        if (EmptyUtil.isEmpty(t) || EmptyUtil.isEmpty(t.getId())){
            return false;
        }
        String indexName = elasticSearchHelpUtils.getIndexName(t);
        String dataJson = JSON.toJSONString(elasticSearchHelpUtils.objectToMap(t));
        try {
            UpdateRequest.Builder builder = new UpdateRequest.Builder<>();
            builder.index(indexName);
            builder.id(t.getId());
            builder.doc(JsonData.fromJson(dataJson));
            UpdateResponse updateResponse = client.update(builder.build(), JsonData.class);
            log.info("文档执行修改方法，返回结果：{}",JSON.toJSONString(updateResponse));
            return updateResponse.result() == Result.Updated;
        }catch (IOException e){
            log.error("根据id：{}修改文档出错 出错原因：",t.getId(),e);
            throw new BizException(0,"修改文档出错出错 出错原因："+e.getMessage());
        }
    }

    /**
     * 根据查询条件更新数据
     *
     * @param query 查询参数
     * @param updateParams    更新参数(如果有多个条件、条件之间是并且关系)
     * @param clazz       索引实体
     * @return 修改的条数
     * @throws Exception
     */
    @Override
    public Long updateByQuery(Query query, Class<T> clazz,Map<String,Object> updateParams) throws BizException {

        if (EmptyUtil.isEmpty(query) || EmptyUtil.isEmpty(updateParams)){
            return 0L;
        }

        List<String> updateList = new ArrayList<>();
        updateParams.forEach((k,v)->{
            if (v instanceof String || v instanceof LocalDateTime){
                updateList.add("ctx._source." + k + "='" + v+"'");
            }else{
                updateList.add("ctx._source." + k + "=" + v);
            }
        });
        String updateSource = String.join(";", updateList);

        String indexName = elasticSearchHelpUtils.getIndexName(clazz);
        UpdateByQueryRequest.Builder builder = new UpdateByQueryRequest.Builder();
        builder.query(query);
        builder.index(indexName);
        builder.conflicts(Conflicts.Proceed);
        builder.script(script->script
                .inline(inlineScript->inlineScript
                        .lang(ScriptLanguage.Painless)
                        .source(updateSource)
                )
        );

        try {
            UpdateByQueryResponse updateByQueryResponse = client.updateByQuery(builder.build());
            return updateByQueryResponse.updated();
        } catch (IOException e) {
            log.error("根据查询条件修改文档出错 出错原因：",e);
            throw new BizException(0,"根据查询条件修改文档出错 出错原因："+e.getMessage());
        }
    }

    /**
     * 根据查询条件查询  返回原始结果集，需要自行处理
     *
     * @param searchRequestBuilder
     * @return
     * @throws Exception
     */
    @Override
    public SearchResponse searchOrigin(SearchRequest.Builder searchRequestBuilder, Class<T> clazz) throws BizException {
        if (EmptyUtil.isEmpty(searchRequestBuilder)){
            return null;
        }
        String indexName = elasticSearchHelpUtils.getIndexName(clazz);
        searchRequestBuilder.index(indexName);
        try {
            return client.search(searchRequestBuilder.build(), JsonData.class);
        } catch (IOException e) {
            log.error("根据查询条件查询文档出错 出错原因：",e);
            throw new BizException(0,"根据查询条件查询文档出错 出错原因："+e.getMessage());
        }
    }

    /**
     * 根据查询条件统计总数
     *
     * @param query
     * @return
     */
    public Long countTotal(Query query, Class<T> clazz) throws BizException {
        try {
            String indexName = elasticSearchHelpUtils.getIndexName(clazz);
            CountResponse count = client.count(countRequest->countRequest.query(query).index(indexName));
            return count.count();
        } catch (IOException e) {
            log.error("根据查询条件统计总数出错 出错原因：",e);
            throw new BizException(0,"根据查询条件统计总数出错 出错原因："+e.getMessage());
        }
    }

    /**
     * 根据查询条件查询,查询指定分页范围的数据集,不包含分页信息
     *
     * @param searchRequestBuilder
     * @param clazz
     * @return
     * @throws Exception
     */
    @Override
    public List<T> searchList(SearchRequest.Builder searchRequestBuilder, Class<T> clazz) throws BizException {
        if (EmptyUtil.isEmpty(searchRequestBuilder)){
            return Collections.emptyList();
        }

        String indexName = elasticSearchHelpUtils.getIndexName(clazz);
        searchRequestBuilder.index(indexName);
        try {
            SearchResponse<Map> searchResponse = client.search(searchRequestBuilder.build(), Map.class);
            List<Hit<Map>> hits = searchResponse.hits().hits();
            if (EmptyUtil.isEmpty(hits)){
                return Collections.emptyList();
            }
            List<T> list = new ArrayList<>();
            for (Hit<Map> hit : hits) {
                Map source = hit.source();
                list.add(JSON.parseObject(JSON.toJSONString(source),clazz));
            }
            return list;
        }catch (IOException e){
            log.error("根据查询条件查询结果出错 出错原因：",e);
            throw new BizException(0,"根据查询条件查询结果出错 出错原因："+e.getMessage());
        }
    }

    /**
     * 根据查询条件分页查询,包含分页信息
     *
     * @param searchRequestBuilder
     * @param clazz
     * @return
     * @throws Exception
     */
    @Override
    public PageInfo<T> searchPage(SearchRequest.Builder searchRequestBuilder,int pageNo ,int pageSize , Class<T> clazz) throws BizException {

        String indexName = elasticSearchHelpUtils.getIndexName(clazz);
        searchRequestBuilder.index(indexName);
        searchRequestBuilder.from((pageNo-1)*pageSize);
        searchRequestBuilder.size(pageSize);

        try {
            SearchResponse<Map> searchResponse = client.search(searchRequestBuilder.build(), Map.class);
            List<Hit<Map>> hits = searchResponse.hits().hits();
            List<T> list = new ArrayList<>();
            for (Hit<Map> hit : hits) {
                Map source = hit.source();
                list.add(JSON.parseObject(JSON.toJSONString(source), clazz));
            }

            PageInfo<T> pageInfo = new PageInfo<>();
            pageInfo.setList(list);
            pageInfo.setPageNum(pageNo);
            pageInfo.setPageSize(pageSize);
            pageInfo.setTotal(searchResponse.hits().total().value());
            pageInfo.setPages(elasticSearchHelpUtils.getTotalPages(searchResponse.hits().total().value(), pageSize));
            return pageInfo;
        }catch (IOException e){
            log.error("根据查询条件查询结果出错 出错原因：",e);
            throw new BizException(0,"根据查询条件查询结果出错 出错原因："+e.getMessage());
        }

    }

    @Override
    public List<String> completionSuggest(String fieldName, String fieldValue, Class<T> clazz) throws BizException {

        String indexName = elasticSearchHelpUtils.getIndexName(clazz);
        SearchRequest.Builder builder = new SearchRequest.Builder();
        builder.suggest(suggester->suggester
                .suggesters("suggest_" + fieldName,
                        fieldSuggester->fieldSuggester.completion(
                                completionSuggester->completionSuggester
                                        .field(fieldName+".completion")
                                        .skipDuplicates(true)
                                        .size(COMPLETION_SUGGESTION_SIZE)

                        ).text(fieldValue)))
               .index(indexName);

        try {
            SearchResponse searchResponse = client.search(builder.build(),Map.class);
            Map<String, List<Suggestion<Map>>> suggest = searchResponse.suggest();
            List<Suggestion<Map>> completionSuggestion = suggest.getOrDefault("suggest_" + fieldName,new ArrayList<>());
            List<String> list = new ArrayList<>();
            for (Suggestion<Map> entry : completionSuggestion) {
                CompletionSuggest<Map> completion = entry.completion();
                List<CompletionSuggestOption<Map>> options = completion.options();
                for (CompletionSuggestOption<Map> option:options) {
                    list.add(option.text());
                }
            }
            return list;
        }catch (IOException e){
            log.error("使用completionSuggest搜索推荐内容出错 搜索字段名：{} 搜索字段值：{} 错误原因：{}",fieldName,fieldValue,e);
            throw new BizException(0,"搜索推荐内容出错 搜索字段名："+fieldName+" 搜索字段值："+fieldValue+" 出错原因："+e.getMessage());
        }

    }

    @Override
    public List<String> completionSearchAsYouType(String fieldName, String fieldValue, Class<T> clazz) throws BizException {
        String indexName = elasticSearchHelpUtils.getIndexName(clazz);
        SearchRequest.Builder builder = new SearchRequest.Builder();
        builder.index(indexName);
        builder.query(queryBuilder->
                queryBuilder.multiMatch(multiMatchQuery->multiMatchQuery
                        .query(fieldValue)
                        .fields(fieldName+".search_as_you_type",fieldName+".search_as_you_type._2gram",fieldName+".search_as_you_type._3gram")
                        .operator(Operator.And)
                        .type(TextQueryType.BoolPrefix)
                )
        );
        builder.size(COMPLETION_SUGGESTION_SIZE);

        try {
            SearchResponse<Map> searchResponse = client.search(builder.build(), Map.class);
            List<Hit<Map>> searchHits = searchResponse.hits().hits();

            if (EmptyUtil.isEmpty(searchHits)){
                return Collections.emptyList();
            }
            List<String> list = new ArrayList<>();
            for (Hit<Map> hit:searchHits) {
                Map<String, Object> sourceAsMap = hit.source();
                list.add(sourceAsMap.getOrDefault(fieldName,"").toString());
            }

            return list.stream().filter(EmptyUtil::isNotEmpty).distinct().collect(Collectors.toList());
        }catch (IOException e){
            log.error("使用completionSearchAsYouType搜索推荐内容出错 搜索字段名：{} 搜索字段值：{} 错误原因：{}",fieldName,fieldValue,e);
            throw new BizException(0,"搜索推荐内容出错 搜索字段名："+fieldName+" 搜索字段值："+fieldValue+" 出错原因："+e.getMessage());
        }
    }

    @Override
    public Map<String,Object> scroll(SearchRequest.Builder builder,String scrollId, Class<T> clazz) throws BizException {
        return scroll(builder,scrollId ,clazz, DEFAULT_SCROLL_TIME);
    }

    @Override
    public Map<String,Object> scroll(SearchRequest.Builder builder,String scrollId, Class<T> clazz, String time) throws BizException {
        if (EmptyUtil.isEmpty(builder) && EmptyUtil.isEmpty(scrollId)) {
            return Collections.emptyMap();
        }
        Map<String,Object> map = null;
        if (EmptyUtil.isEmpty(scrollId)){
            //第一次查询使用scroll查询没有scrollId,需要调用创建scrollId的请求
            //把ES服务端返回的scrollId返回给调用方，查询后面的数据时只需要传入scrollId即可
            map = scrollSearchNoScrollId(builder, clazz, time);
        }else{
            //根据之前返回的scrollId查询下一页数据
            map = scrollSearchHaveScrollId(scrollId, clazz, time);
        }
        //判断是否需要清除scrollId,需要清除则执行清除操作
        clearScrollId(map);
        return map;
    }

    /**
     * 没有scrollId时,创建scrollId并返回第一个页的数据
     * @param builder
     * @param clazz
     * @param time
     * @return
     * @throws BizException
     */
    private Map<String,Object> scrollSearchNoScrollId(SearchRequest.Builder builder, Class<T> clazz, String time) throws BizException{
        String indexName = elasticSearchHelpUtils.getIndexName(clazz);
        builder.index(indexName);
        //scroll请求每次都会生成一个新的scroll_id,大量请求时会创建很多的scroll_id，因此上下文有效时间不宜过长，否则会长时间占用内存和文件句柄
        builder.scroll(timeBuilder->timeBuilder.time(time));
        try {
            SearchResponse searchResponse = client.search(builder.build(),Map.class);
            return getMapResult(searchResponse,clazz);
        }catch (IOException e) {
            log.error("使用Scroll查询内容出错 错误原因：",e);
            throw new BizException(0,"使用Scroll查询内容出错 出错原因："+e.getMessage());
        }
    }

    /**
     * 有scrollId时,使用scrollId查询后面页数的数据集
     * @param scrollId
     * @param clazz
     * @param time
     * @return
     * @throws BizException
     */
    private Map<String,Object> scrollSearchHaveScrollId(String scrollId, Class<T> clazz, String time) throws BizException{
        try {
            //scroll请求每次都会生成一个新的scroll_id,大量请求时会创建很多的scroll_id，因此上下文有效时间不宜过长，否则会长时间占用内存和文件句柄
            ScrollResponse<Map> scrollResponse = client.scroll(scrollRequest -> scrollRequest
                            .scrollId(scrollId)
                            .scroll(timeBuilder -> timeBuilder.time(time))
                    , Map.class);
            return getMapResultByScrollResponse(scrollResponse,clazz);
        }catch (IOException e){
            log.error("使用Scroll_ID查询内容出错 错误原因：",e);
            throw new BizException(0,"使用Scroll_ID查询内容出错 出错原因："+e.getMessage());
        }
    }

    /**
     * 将查询到的结果集组装到map中返回
     * @param searchResponse
     * @param clazz
     * @return
     */
    private Map<String,Object> getMapResult(SearchResponse searchResponse,Class<T> clazz){
        Map<String,Object> map = new HashMap<>();

        if (EmptyUtil.isEmpty(searchResponse)){
            map.put(SCROLL_SEARCH_RESULT, null);
            map.put(SCROLL_ID, null);
            return map;
        }
        List<T> list = new ArrayList<>();
        List<Hit<T>> searchHits = searchResponse.hits().hits();
        if (EmptyUtil.isEmpty(searchHits)) {
            map.put(SCROLL_SEARCH_RESULT, null);
            map.put(SCROLL_ID, null);
            return map;
        }
        for (Hit<T> hit : searchHits) {
            list.add(JSON.parseObject(JSON.toJSONString(hit.source()), clazz));
        }
        //结果集
        map.put(SCROLL_SEARCH_RESULT, list);
        //scrollId,查询后面页数据使用
        map.put(SCROLL_ID, searchResponse.scrollId());
        return map;
    }

    /**
     * 将查询到的结果集组装到map中返回
     * @param scrollResponse
     * @param clazz
     * @return
     */
    private Map<String,Object> getMapResultByScrollResponse(ScrollResponse scrollResponse,Class<T> clazz){
        Map<String,Object> map = new HashMap<>();

        if (EmptyUtil.isEmpty(scrollResponse)){
            map.put(SCROLL_SEARCH_RESULT, null);
            map.put(SCROLL_ID, null);
            return map;
        }
        List<T> list = new ArrayList<>();
        List<Hit<T>> searchHits = scrollResponse.hits().hits();
        if (EmptyUtil.isEmpty(searchHits)) {
            map.put(SCROLL_SEARCH_RESULT, null);
            map.put(SCROLL_ID, scrollResponse.scrollId());
            return map;
        }
        for (Hit<T> hit : searchHits) {
            list.add(JSON.parseObject(JSON.toJSONString(hit.source()), clazz));
        }
        //结果集
        map.put(SCROLL_SEARCH_RESULT, list);
        //scrollId,查询后面页数据使用
        map.put(SCROLL_ID, scrollResponse.scrollId());
        return map;
    }

    /**
     * 如果结果集为空将scrollId清除
     * @param map
     */
    private void clearScrollId(Map<String,Object> map){
        if (EmptyUtil.isEmpty(map)){
            return;
        }
        if (EmptyUtil.isNotEmpty(map.getOrDefault(SCROLL_SEARCH_RESULT,null))){
            return;
        }
        if (EmptyUtil.isEmpty(map.getOrDefault(SCROLL_ID,null))){
            return;
        }
        try {
            ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest -> clearScrollRequest.scrollId((String) map.get(SCROLL_ID)));
            boolean succeeded = clearScrollResponse.succeeded();
            log.info("清除 ScrollId返回结果:{}",succeeded);
        }catch (Exception e){
            //出错不能影响查询结果，捕获异常即可
            log.error("清除scrollId出错");
        }
    }

    /**
     * 通用聚合查询
     * @param query 查询条件
     * @param aggParam 最外成聚合，外层有几个聚合，集合中需要几个对象，参数结果如下示例
     *         List<AggregationParamEntity> aggParam = new ArrayList<>();
     *         aggParam.add(AggregationParamEntity.builder().name("user_name_agg").type(ESAggregationType.terms).field("user_name").subAgg(Arrays.asList(AggregationParamEntity.builder().name("user_age_avg_agg").type(ESAggregationType.avg).field("user_age").build(),AggregationParamEntity.builder().name("weight_max_agg").type(ESAggregationType.max).field("weight").build())).build());
     *         aggParam.add(AggregationParamEntity.builder().name("weight_avg_agg").type(ESAggregationType.avg).field("weight").build());
     * @param clazz
     * @return 原始集合结果集，未做处理
     * @throws BizException
     */
    @Override
    public Map<String, Aggregate> aggregation(Query query, List<AggregationParamEntity> aggParam, Class<T> clazz) throws BizException {

        if (EmptyUtil.isEmpty(aggParam)){
            throw new BizException(0,"聚合参数不能为空");
        }

        String indexName = elasticSearchHelpUtils.getIndexName(clazz);

        Map<String, Aggregation> aggregationMap = new HashMap<>();
        for (AggregationParamEntity e:aggParam){
            ParseAggregationService service = (ParseAggregationService)SpringContextUtil.getBean(e.getType().toString()+"ParseAggregation");
            aggregationMap.put(e.getName(),service.parseAggregation(e));
        }
        try {
            SearchResponse searchResponse = client.search(searchRequest->searchRequest
                            .index(indexName)
                            .query(query)
                            .aggregations(aggregationMap)
                            .size(0),
                    Map.class);
            Map<String, Aggregate> aggregations = searchResponse.aggregations();
            return aggregations;
        }catch (IOException e){
            log.error("聚合出错 错误原因：",e);
            throw new BizException(0,"聚合出错 出错原因："+e.getMessage());
        }
    }
}
